package com.xiaojie.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author xiaojie
 * @version 1.0
 * @description: TODO
 * @date 2021/10/13 21:55
 */
@Component
@Slf4j
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    /*
     *
     * @发送消息
     * @author xiaojie
     * @date 2021/10/14 15:23
     * @return void
     */
    @Transactional
    public void sendMSg() {
        System.out.println(">>>>>>>>>>>>>>>>>");
        for (int i = 0; i < 5; i++) {
            kafkaTemplate.send("xiaojie-topic", "test message>>>>>>>>>>>>>>>>>>>>>>" + i);
        }
    }

    @Transactional
    public void sendMSg2() {
        System.out.println(">>>>>>>>>>>>>>>>>");
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i=0;i<10;i++){
            executorService.execute(() -> {
                System.out.println("线程的名称为：》》》》"+Thread.currentThread().getName());
                for (int j = 0;j < 1000; j++) {
                    kafkaTemplate.send("my-topic-partition", "my-topic-partition message>>>>>>>>>>>>>>>>>>>>>>" + j);
                }
            });
        }
    }
    /*
     *
     * @param callbackMessage
     * @具有回调函数的发送消息
     * @author xiaojie
     * @date 2021/10/14 16:59
     * @return void
     */
    @Transactional
    public void sendMsgCallback(String callbackMessage) {
        kafkaTemplate.send("callback-topic", "xiaojie_key", callbackMessage).addCallback(success -> {
            //当消息发送成功的回调函数
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功>>>>>>>>>>>>>>>>>>>>>>>>>" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            //消息发送失败的回调函数
            System.out.println("消息发送失败，可以进行人工补偿");
        });
    }

    @Transactional
    public void sendMsgCallback1(String callbackMessage) {
        kafkaTemplate.send("callback-topic", "xiaojie_key", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                //发送失败
                System.out.println("发送失败。。。。。。。。。。。");
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                //分区信息
                Integer partition = result.getRecordMetadata().partition();
                //主题
                String topic = result.getProducerRecord().topic();
                String key = result.getProducerRecord().key();
                //发送成功
                System.out.println("发送成功。。。。。。。。。。。分区为：" + partition + ",主题topic:" + topic + ",key:" + key);
            }
        });
    }

    /**
     * @description: kafak事务提交 本地事务不需要事务管理器
     * @param:
     * @return: void
     * @author xiaojie
     * @date: 2021/10/14 21:35
     */

    public void sendTx() {
        kafkaTemplate.executeInTransaction(kafkaOperations -> {
            String msg = "这是一条测试事务的数据......";
            kafkaOperations.send("tx-topic", msg);
            int i = 1 / 0; //报错之后，由于事务存在，消息并不会发送到broker
            return null;
        });
    }

    @Transactional
    public void sendTestMSg() {
        for (int i = 0; i < 50; i++) {
            kafkaTemplate.send("test-topic", "weixin" ,"这是测试test-topic  weixin的第" + i + "条消息");
        }
    }

    @Transactional
    public void sendXiaojieMSg() {
        for (int i = 0; i < 50; i++) {
            kafkaTemplate.send("xiaojie-test-topic", "这是测试xiaojie-test-topic的第" + i + "条消息");
        }
    }

    /**
     * @description:  消息过滤器
     * @param:
     * @param: msg
     * @return: void
     * @author xiaojie
     * @date: 2021/10/16 0:59
     */
    @Transactional
    public void sendFilterMsg(String msg){
        kafkaTemplate.send("filter-topic",msg);
    }

}
